package com.ksj.Utils.MqttTool;


import com.ksj.Config.RemoteConfigs;
import com.ksj.Service.MainService;
import org.apache.activemq.ActiveMQConnectionFactory;

import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.Map;

public class MqttTool {
    private static Logger LOG = LoggerFactory.getLogger(MqttTool.class);

    private static MQTT mqtt;
    private static CallbackConnection listenerC;
    private static ConnectionFactory factory;
    private static Connection connection;
    private static Session session;

    public static void init() {
        //初始化操作
        try {
            LOG.info("MqttTool初始化开始~！");
            mqtt = new MQTT();

            String host = RemoteConfigs.getInstance().apollo_host;
            // System.out.println(host);
            String port = RemoteConfigs.getInstance().apollo_prot;
            String username = RemoteConfigs.getInstance().apollo_username;
            String password = RemoteConfigs.getInstance().apollo_password;
//            String bhost = RemoteConfigs.getInstance().apollo_bhost;
//            String bport = RemoteConfigs.getInstance().apollo_bport;
//            (bhost != null && bport != null ? ",tcp://" + bhost + ":" + bport : "")
            String url = "failover:(tcp://" + host + ":" + port +
                    ")?randomize=false&initialReconnectDelay=1000";

            mqtt.setHost(host, Integer.valueOf(port));
            mqtt.setUserName(username);
            mqtt.setPassword(password);
            //System.out.println("***********");


            //连接前清空会话信息
            if (RemoteConfigs.getInstance().apollo_clean_start != null &&
                    RemoteConfigs.getInstance().apollo_clean_start.length() > 0)
                mqtt.setCleanSession(RemoteConfigs.getInstance().apollo_clean_start.equals("0") ? false : true);


            //设置重新连接的次数
            if (RemoteConfigs.getInstance().apollo_reconnect_count != null &&
                    RemoteConfigs.getInstance().apollo_reconnect_count.length() > 0)
                mqtt.setReconnectAttemptsMax(Long.valueOf(RemoteConfigs.getInstance().apollo_reconnect_count));


            //设置重连的间隔时间
            if (RemoteConfigs.getInstance().apollo_reconnect_dealy != null &&
                    RemoteConfigs.getInstance().apollo_reconnect_dealy.length() > 0)
                mqtt.setReconnectDelay(Long.valueOf(RemoteConfigs.getInstance().apollo_reconnect_dealy));
            //设置心跳时间
            if (RemoteConfigs.getInstance().apollo_keep_alive != null &&
                    RemoteConfigs.getInstance().apollo_keep_alive.length() > 0)
                mqtt.setKeepAlive(Short.valueOf(RemoteConfigs.getInstance().apollo_keep_alive));

            //设置缓冲的大小
            if (RemoteConfigs.getInstance().apollo_buffer_size != null &&
                    RemoteConfigs.getInstance().apollo_buffer_size.length() > 0)
                mqtt.setSendBufferSize(Integer.valueOf(RemoteConfigs.getInstance().apollo_buffer_size) * 1024 * 1024);


            factory = new ActiveMQConnectionFactory(username, password, url);
            connection = factory.createConnection();
            connection.start();


            //创建一个session
            //第一个参数:是否支持事务，如果为true，则会忽略第二个参数，被jms服务器设置为SESSION_TRANSACTED
            //第二个参数为false时，paramB的值可为Session.AUTO_ACKNOWLEDGE，Session.CLIENT_ACKNOWLEDGE，DUPS_OK_ACKNOWLEDGE其中一个。
            //Session.AUTO_ACKNOWLEDGE为自动确认，客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常，也会被当作正常发送成功。
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后，必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功，并删除消息。
            //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回，会话对象就会确认消息的接收；而且允许重复确认。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            listenerC = mqtt.callbackConnection();
            listenerC.connect(new Callback<Void>() {

                public void onSuccess(Void value) {
                    LOG.info("连接" + mqtt.getHost() + "成功!");
                }


                public void onFailure(Throwable value) {
                    LOG.info("连接" + mqtt.getHost() + "失败!");
                    LOG.error("apollo连接失败！",value);

                }
            });
        } catch (JMSException e) {
            LOG.error("JMS异常！",e);
        } catch (URISyntaxException e) {
            LOG.error("URL语法异常！",e);
        }
        LOG.info("MqttTool初始化完成~！");
    }


    //暂停mqtt服务
    public static void stop(boolean isRestart) {
        if (listenerC != null) {
            listenerC.disconnect(new Callback<Void>() {
                @Override
                public void onSuccess(Void value) {

                    LOG.info("断开连接成功~!");
                    if (isRestart)
                        MainService.startService();
                }

                @Override
                public void onFailure(Throwable value) {
                    LOG.info("断开连接失败~!");
                    LOG.error("apollo连接失败！",value);
                }
            });
        }
        if (session != null) {
            try {
                session.close();
            } catch (JMSException e) {
                LOG.error("session关闭错误！",e);
            }
        }

        if (connection != null) {
            try {
                connection.stop();
                connection.close();
            } catch (JMSException e) {
                LOG.error("connection关闭错误！",e);
            }
        }
    }

    //发送一条队列消息
    public static void sendQueueMsg(String queue, String msg) {
        //创建一个到达的目的地，其实想一下就知道了，activemq不可能同时只能跑一个队列吧，这里就是连接了一个名为"text-msg"的队列，这个会话将会到这个队列，当然，如果这个队列不存在，将会被创建
        if (session != null) {
            Destination destination;
            MessageProducer producer = null;

            try {
                destination = session.createQueue(queue);
                //从session中，获取一个消息生产者
                producer = session.createProducer(destination);
                //设置生产者的模式，有两种可选
                //DeliveryMode.PERSISTENT 当activemq关闭的时候，队列数据将会被保存
                //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候，队列里面的数据将会被清空
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                //创建一条消息，当然，消息的类型有很多，如文字，字节，对象等,可以通过session.create..方法来创建出来
                TextMessage textMsg = session.createTextMessage(msg);
                producer.send(textMsg);


                LOG.debug("发送" + queue + "队列消息成功!");

            } catch (JMSException e) {
                LOG.error("JMS异常！",e);
            }
        } else {
            LOG.error("session为空！");
        }
    }

    //订阅一条队列消息
    public static void subscribeQueueMsg(String queue, MessageListener listener) {
        if (session != null) {
            Destination destination;
            MessageConsumer consumer = null;
            //创建一个到达的目的地，其实想一下就知道了，activemq不可能同时只能跑一个队列吧，这里就是连接了一个名为"text-msg"的队列，这个会话将会到这个队列，当然，如果这个队列不存在，将会被创建
            try {
                destination = session.createQueue(queue);
                //根据session，创建一个接收者对象
                consumer = session.createConsumer(destination);
                consumer.setMessageListener(listener);
            } catch (JMSException e) {
                LOG.error("JMS异常！",e);
            }
        } else {
            LOG.error("session为空！");
        }

    }

    //发送单条消息
    public static void sendMsg(String topic, String msg) {

        //发布消息
        if (listenerC != null) {
            listenerC.publish(topic, msg.getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() {
                public void onSuccess(Void v) {
                    LOG.info("发送消息成功~!");
                }

                public void onFailure(Throwable value) {
                    LOG.info("发送消息失败~!");
                }
            });
        }

    }


    //发送多条消息
    public static void sendMsgs(Map<String, String> topicAndVal) {
        if (listenerC != null) {
            for (String topic : topicAndVal.keySet()) {
                String msg = topicAndVal.get(topic);
                listenerC.publish(topic, msg.getBytes(), QoS.AT_LEAST_ONCE, true, new Callback<Void>() {
                    public void onSuccess(Void v) {
                        LOG.info("发送消息成功~!");
                    }

                    public void onFailure(Throwable value) {
                        LOG.info("发送消息失败~!");
                    }
                });
            }

        }

    }

    //订阅一条消息
    public static void listenMgs(String topic, Listener listener) {
        //CallbackConnection listenerC = getConn();

        if (listenerC != null) {
            org.fusesource.mqtt.client.Topic[] topics = {new org.fusesource.mqtt.client.Topic(topic, QoS.AT_LEAST_ONCE)};
            listenerC.listener(listener);
            listenerC.subscribe(topics, new Callback<byte[]>() {
                public void onSuccess(byte[] qoses) {
                    LOG.debug("订阅成功!");

                }

                public void onFailure(Throwable value) {
                    LOG.debug("订阅失败!");
                    LOG.error("订阅 "+topic+" topic失败",value);
                }
            });
        }

    }

    //订阅多条消息
    public static void listenMgses(Map<String, Listener> map) {
        if (listenerC != null) {
            for (String key : map.keySet()) {
                //CallbackConnection listenerC = getConn();
                org.fusesource.mqtt.client.Topic[] topics = {new org.fusesource.mqtt.client.Topic(key, QoS.AT_LEAST_ONCE)};
                listenerC.listener(map.get(key));
                listenerC.subscribe(topics, new Callback<byte[]>() {
                    public void onSuccess(byte[] qoses) {
                        LOG.debug("订阅成功!");
                    }

                    public void onFailure(Throwable value) {
                        LOG.debug("订阅失败!");
                        LOG.error("订阅 "+key+" topic失败",value);
                    }
                });
            }
        }


    }

    //取消订阅
    public static void unSubscribe(String... topics) {
        UTF8Buffer[] buffers = new UTF8Buffer[topics.length];
        for (int i = 0; i < topics.length; i++) {
            buffers[i] = new UTF8Buffer(topics[i]);
        }

        listenerC.unsubscribe(buffers, new Callback<Void>() {

            public void onSuccess(Void aVoid) {
                LOG.debug("取消订阅成功");
            }


            public void onFailure(Throwable throwable) {
                LOG.debug("取消订阅失败!");
                LOG.error("订阅 "+topics+" topic失败",throwable);
            }
        });
    }

    public static void lockClass() {
        synchronized (MqttTool.class) {
            while (true) {
                try {
                    MqttTool.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static CallbackConnection getConn() {
        CallbackConnection listenerC;
        listenerC = mqtt.callbackConnection();
        listenerC.connect(new Callback<Void>() {

            public void onSuccess(Void value) {
                System.out.println("连接成功！");
            }

            public void onFailure(Throwable value) {
                System.out.println("连接失败！");
                LOG.error("apollo链接失败",value);

            }
        });
        return listenerC;
    }

}
